
use clap::Parser;
use crate::source::{SourceConf, source_run};
use crate::logical_plan::work_space::{WorkSpaceConf, work_space_run};
use crate::logical_plan::work_node::{work_node_run};
use std::collections::VecDeque;
use crate::restful::RestfulConf;
use crate::sink::{SinkConf, sink_run};
use crate::util::ModeLogConf;
use crate::logical_plan::work_node::conf::WorkNodeConf;
use tokio::runtime::Runtime;
use crate::indicators::{IndicatorsStoreConf, indicator_store_run};
use glob::glob;

#[derive(Deserialize)]
pub struct NormalNodeConf{
    pub sync_channel_size: usize,
    pub async_channel_size: usize,
}

#[derive(Deserialize,Debug)]
pub struct EventFlowNodeConf{
    pub includes : Option<Vec<String>>,
    pub source: Option<Vec<SourceConf>>,
    pub work_space: Option<Vec<WorkSpaceConf>>,
    pub work_node: Option<Vec<WorkNodeConf>>,
    pub sink: Option<Vec<SinkConf>>,
}

#[test]
fn node_from_yaml_test(){
    let yaml_string = match std::fs::read_to_string(std::path::Path::new("./conf/node.yaml")){
        Ok(t) => {t},
        Err(e) => {
            println!("read_to_string failed [{:?}]", e);
            std::process::exit(-1);
        },
    };
    let t: EventFlowNodeConf = match serde_yaml::from_str(yaml_string.as_str()){
        Ok(t) => {t},
        Err(e) => {
            println!("serde_yaml::from_str failed [{:?}]", e);
            std::process::exit(-1);
        },
    };
    println!("EventFlowNodeConf {:?}", t);
}
#[derive(Deserialize)]
pub struct NormalConf{
    pub snowflake_machine_id: usize,
    pub snowflake_node_id: usize,
    pub timer_channel_size: usize,
    pub sleep_while_null: u64,
}
#[derive(Deserialize)]
pub struct EventFlowNormalConf{
    pub normal: NormalConf,
    pub node: NormalNodeConf,
    pub restful: RestfulConf,
    pub indicator: Option<IndicatorsStoreConf>,
    pub mode_log: Option<Vec<ModeLogConf>>,
}

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct EventFlowCmdConf{
    #[clap(short='l', long="log4rs", default_value="./conf/log4rs.yaml")]
    pub log4rs_yaml: String,
    #[clap(short='n', long="node", default_value="./conf/node.yaml")]
    pub node_yaml: String,
    #[clap(short='c', long="conf", default_value="./conf/normal.toml")]
    pub normal_toml: String,
}

lazy_static!{
    pub static ref EVENT_FLOW_CMD_CONF_INSTANCE: EventFlowCmdConf = {
        EventFlowCmdConf::parse()
    };
    pub static ref EVENT_FLOW_NORMAL_CONFINSTNCE: EventFlowNormalConf = {
        event_flow_normal_from_conf(&EVENT_FLOW_CMD_CONF_INSTANCE.normal_toml)
    };
    pub static ref EVENT_FLOW_NODE_CONF_INSTANCE: EventFlowNodeConf = {
        event_flow_node_from_conf(&EVENT_FLOW_CMD_CONF_INSTANCE.node_yaml)
    };
    pub static ref CONF_RUNTIME: Runtime = {
        match tokio::runtime::Builder::new_multi_thread()
            .thread_name("conf")
            .worker_threads(1)
            .max_blocking_threads(1)
            .enable_all()
            .build(){
            Result::Ok(t) => {
                t
            },
            Result::Err(e) => {
                error!("CONF_RUNTIME err");
                std::process::exit(-1);
            },
        }
    };
}

pub fn log4rs_init_local()->MapResult<()>{
    if let Err(e) = log4rs::init_file("./conf/log4rs.yaml", Default::default()){
        return MapResult::Err(MapErr::new(format!("log4rs init_file failed, {}", e)));
    }else{
        return MapResult::Ok(());
    }
}

pub fn event_flow_normal_from_conf(conf_file: &str)->EventFlowNormalConf{
    let toml_string = match std::fs::read_to_string(std::path::Path::new(conf_file)){
        Ok(t) => {t},
        Err(e) => {
            error!("normal conf init failed [{:?}]", e);
            std::process::exit(-1);
        },
    };

    match toml::from_str(&toml_string) {
        Ok(t) => {
            t
        },
        Err(e) => {
            error!("normal conf toml::from_str failed [{:?}]", e);
            std::process::exit(-1);
        },
    }
}
pub fn _event_flow_node_from_conf(conf_file: &str)->EventFlowNodeConf{
    let yaml_string = match std::fs::read_to_string(std::path::Path::new(conf_file)){
        Ok(t) => {t},
        Err(e) => {
            error!("node conf init failed [{:?}]", e);
            std::process::exit(-1);
        },
    };
    match serde_yaml::from_str(yaml_string.as_str()){
        Ok(t) => {t},
        Err(e) => {
            error!("node conf init failed [{:?}]", e);
            std::process::exit(-1);
        },
    }
}

impl EventFlowNodeConf {
    pub fn new() -> EventFlowNodeConf{
        let value = EventFlowNodeConf {
            includes:None,
            source:None,
            work_space:None,
            work_node:None,
            sink:None
        };
        value
    }
    
    pub fn get(&mut self, s: String) -> bool{
        match &mut self.includes {
            Some(a) => {
                for v in a.iter() {
                    if *v == s {
                        return true;
                    }
                }
            
                return false;
            }
            None => {
                return false;
            }
        }
    }

    pub fn read_conf(&mut self, conf_file: &str){
        for entry in glob(conf_file).expect("Failed to read glob pattern") {
            match entry {
                Ok(path) => {
                    let conf = _event_flow_node_from_conf(path.to_str().unwrap());

                    if let Some(inc) = &mut self.includes {
                        inc.push(path.to_str().unwrap().to_string());
                    } else {
                        self.includes = Some(vec!(path.to_str().unwrap().to_string()));
                    }

                    if let Some(mut a) =  conf.source {
                        if let Some(e) = &mut self.source {
                            e.append(&mut a) ;
                        } else {
                            self.source = Some(a);
                        }
                    }
            
                    if let Some(mut a) =  conf.work_space {
                        if let Some(e) = &mut self.work_space {
                            e.append(&mut a) ;
                        } else {
                            self.work_space = Some(a);
                        }
                    }
            
                    if let Some(mut a) =  conf.work_node {
                        if let Some(e) = &mut self.work_node {
                            e.append(&mut a) ;
                        } else {
                            self.work_node = Some(a);
                        }
                    }
            
                    if let Some(mut a) =  conf.sink {
                        if let Some(e) = &mut self.sink {
                            e.append(&mut a) ;
                        } else {
                            self.sink = Some(a);
                        }
                    }
                    
                    if let Some(v) = conf.includes {
                        for a in v {
                            for entry in glob(&a).expect("Failed to read glob pattern") {
                                match entry {
                                    Ok(path) => {
                                        if self.get(path.to_str().unwrap().to_string()) == false {
                                            self.read_conf(path.to_str().unwrap());
                                        }
                                    }
                                    Err(e) => {
                                        error!("event_flow_node_from_conf read_list failed [{:?}]", e);
                                        std::process::exit(-1);
                                    }
                                }
                            }
                        }
                    }
                }
                Err(e) => {
                    error!("event_flow_node_from_conf read_list failed [{:?}]", e);
                    std::process::exit(-1);
                }
            }
        }
    }
}

pub fn event_flow_node_from_conf(conf_file: &str)->EventFlowNodeConf{
    let mut conf = EventFlowNodeConf::new();
    conf.read_conf(conf_file);
    
    return conf;
}

#[inline]
pub fn mode_log_conf()->&'static Option<Vec<ModeLogConf>>{
    &EVENT_FLOW_NORMAL_CONFINSTNCE.mode_log
}

#[inline]
pub fn sink_conf()->&'static Option<Vec<SinkConf>>{
    &EVENT_FLOW_NODE_CONF_INSTANCE.sink
}

pub fn indicator_conf()->&'static Option<IndicatorsStoreConf>{
    &EVENT_FLOW_NORMAL_CONFINSTNCE.indicator
}

#[inline]
pub fn source_conf()->&'static Option<Vec<SourceConf>>{
    &EVENT_FLOW_NODE_CONF_INSTANCE.source
}

#[inline]
pub fn work_space_conf()->&'static Option<Vec<WorkSpaceConf>>{
    &EVENT_FLOW_NODE_CONF_INSTANCE.work_space
}

#[inline]
pub fn work_node_conf()->&'static Option<Vec<WorkNodeConf>>{
    &EVENT_FLOW_NODE_CONF_INSTANCE.work_node
}

#[inline]
pub fn normal_conf()->&'static NormalConf{
    &EVENT_FLOW_NORMAL_CONFINSTNCE.normal
}

#[inline]
pub fn restful_conf()->&'static RestfulConf{
    &EVENT_FLOW_NORMAL_CONFINSTNCE.restful
}

#[derive(Debug)]
pub struct MapErr {
    desc: VecDeque<String>,
}
impl MapErr {
    pub fn new(desc: String) -> MapErr {
        let mut tmp = VecDeque::new();
        tmp.push_back(desc);

        MapErr {
            desc: tmp,
        }
    }
    pub fn append(&mut self, desc: String){
        self.desc.push_back(desc);
    }
}

pub type MapResult<T> = Result<T, MapErr>;

pub fn conf_map_run()->MapResult<()>{

    info!("conf_map_run");

    let conf_runtime = &CONF_RUNTIME;

    info!("sink_run");
    conf_runtime.block_on(sink_run())?;

    info!("work_node_run");
    conf_runtime.block_on(work_node_run())?;

    info!("work_space_run");
    conf_runtime.block_on(work_space_run())?;

    info!("source_run");
    conf_runtime.block_on(source_run())?;

    indicator_store_run()?;

    return MapResult::Ok(());
}

